Introduction
There are use cases where we would like to get the first
or last
of something within a group
or particular grain
.
It is natural to do something in SQL like:
select
col_1,
first(col_2) as first_something,
last(col_2) as first_something
from table
group by 1
order by 1
Which leads us to writing spark code like this df.orderBy().groupBy().agg()
. This has unexpected behaviours in spark and can be different each run.
Library Imports
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F, Window
Create a SparkSession
. No need to create SparkContext
as you automatically get it as part of the SparkSession
.
spark = (
SparkSession.builder
.master("local")
.appName("Exploring Joins")
.config("spark.some.config.option", "some-value")
.getOrCreate()
)
sc = spark.sparkContext
Initial Datasets
pets = spark.createDataFrame(
[
(1, 1, datetime(2018, 1, 1, 1 ,1, 1), 'Bear', 5),
(2, 1, datetime(2010, 1, 1, 1 ,1, 1), 'Chewie', 15),
(3, 1, datetime(2015, 1, 1, 1 ,1, 1), 'Roger', 10),
], ['id', 'breed_id', 'birthday', 'nickname', 'age']
)
pets.toPandas()
id | breed_id | birthday | nickname | age | |
---|---|---|---|---|---|
0 | 1 | 1 | 2018-01-01 01:01:01 | Bear | 5 |
1 | 2 | 1 | 2010-01-01 01:01:01 | Chewie | 15 |
2 | 3 | 1 | 2015-01-01 01:01:01 | Roger | 10 |
Option 1: Wrong Way
Result 1
df_1 = (
pets
.orderBy('birthday')
.groupBy('breed_id')
.agg(F.first('nickname').alias('first_breed'))
)
df_1.toPandas()
breed_id | first_breed | |
---|---|---|
0 | 1 | Chewie |
Result 2
df_2 = (
pets
.orderBy('birthday')
.groupBy('breed_id')
.agg(F.first('nickname').alias('first_breed'))
)
df_2.toPandas()
breed_id | first_breed | |
---|---|---|
0 | 1 | Chewie |
Option 2: Window Object, Right Way
window = Window.partitionBy('breed_id').orderBy('birthday')
df_3 = (
pets
.withColumn('first_breed', F.first('nickname').over(window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)))
.withColumn('rn', F.row_number().over(window.rowsBetween(Window.unboundedPreceding, Window.currentRow)))
)
df_3.toPandas()
id | breed_id | birthday | nickname | age | first_breed | rn | |
---|---|---|---|---|---|---|---|
0 | 2 | 1 | 2010-01-01 01:01:01 | Chewie | 15 | Chewie | 1 |
1 | 3 | 1 | 2015-01-01 01:01:01 | Roger | 10 | Chewie | 2 |
2 | 1 | 1 | 2018-01-01 01:01:01 | Bear | 5 | Chewie | 3 |
Summary
Ok so my example didn't work locally lol, but trust me it that orderBy()
in a statement like this: orderBy().groupBy()
doesn't maintain it's order!
reference: https://stackoverflow.com/a/50012355
For anything aggregation that needs an ordering performed (ie. first
, last
, etc.), we should avoid using groupby()
s and instead we should use a window
object.